{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed (multi-function) pipeline example\n",
    "\n",
    "This example demonstrates how to run a pipeline that consists of multiple serverless functions (connected using streams).\n",
    "\n",
    "In the pipeline example the request contains the a URL of a file. It loads the content of the file and breaks it into paragraphs (using the FlatMap class), and pushes the results to a queue/stream. The second function picks up the paragraphs and runs the NLP flow to extract the entities and push the results to the output stream."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Setting the stream URLs for the internal queue, the final output and error/exceptions stream:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "streams_prefix = \"v3io:///users/admin/\"\n",
    "internal_stream = streams_prefix + \"in-stream\"\n",
    "out_stream = streams_prefix + \"out-stream\"\n",
    "err_stream = streams_prefix + \"err-stream\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternatively, using Kafka:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kafka_prefix = f\"kafka://{broker}/\"\n",
    "internal_topic = kafka_prefix + \"in-topic\"\n",
    "out_topic = kafka_prefix + \"out-topic\"\n",
    "err_topic = kafka_prefix + \"err-topic\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In either case, continue with:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-05-03 14:28:39,987 [warning] Failed resolving version info. Ignoring and using defaults\n",
      "> 2021-05-03 14:28:43,801 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': '0.6.3-rc4', 'client_version': 'unstable'}\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "('pipe', '/v3io/projects/{{run.project}}/artifacts')"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# set up the environment\n",
    "import mlrun\n",
    "\n",
    "project = mlrun.get_or_create_project(\"pipe\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment to install spacy requirements locally\n",
    "# !pip install spacy\n",
    "# !python -m spacy download en_core_web_sm"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**In this example**\n",
    "- [Create the pipeline](#create-the-pipeline)\n",
    "- [Test the pipeline locally](#test-the-pipeline-locally)\n",
    "- [Deploy to the cluster](#deploy-to-the-cluster)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create the pipeline\n",
    "\n",
    "The pipeline consists of two functions: data-prep and NLP. Each one has different package dependencies.\n",
    "\n",
    "**Create a file with data-prep graph steps:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting data_prep.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile data_prep.py\n",
    "import mlrun\n",
    "import json\n",
    "\n",
    "# load struct from a json file (event points to the url)\n",
    "def load_url(event):\n",
    "    url = event[\"url\"]\n",
    "    data = mlrun.get_object(url).decode(\"utf-8\")\n",
    "    return {\"url\": url, \"doc\": json.loads(data)}\n",
    "\n",
    "def to_paragraphs(event):\n",
    "    paragraphs = []\n",
    "    url = event[\"url\"]\n",
    "    for i, paragraph in enumerate(event[\"doc\"]):\n",
    "        paragraphs.append(\n",
    "            {\"url\": url, \"paragraph_id\": i, \"paragraph\": paragraph}\n",
    "        )\n",
    "    return paragraphs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Create a file with NLP graph steps (use spacy):**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting nlp.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile nlp.py\n",
    "import json\n",
    "import spacy\n",
    "\n",
    "def myprint(x):\n",
    "    print(x)\n",
    "    return x\n",
    "\n",
    "class ApplyNLP:\n",
    "    def __init__(self, context=None, spacy_dict=\"en_core_web_sm\"):\n",
    "\n",
    "        self.nlp = spacy.load(spacy_dict)\n",
    "\n",
    "    def do(self, paragraph: dict):\n",
    "        tokenized_paragraphs = []\n",
    "        if isinstance(paragraph, (str, bytes)):\n",
    "            paragraph = json.loads(paragraph)\n",
    "        tokenized = {\n",
    "            \"url\": paragraph[\"url\"],\n",
    "            \"paragraph_id\": paragraph[\"paragraph_id\"],\n",
    "            \"tokens\": self.nlp(paragraph[\"paragraph\"]),\n",
    "        }\n",
    "        tokenized_paragraphs.append(tokenized)\n",
    "\n",
    "        return tokenized_paragraphs\n",
    "\n",
    "def extract_entities(tokens):\n",
    "    paragraph_entities = []\n",
    "    for token in tokens:\n",
    "        entities = token[\"tokens\"].ents\n",
    "        for entity in entities:\n",
    "            paragraph_entities.append(\n",
    "                {\n",
    "                    \"url\": token[\"url\"],\n",
    "                    \"paragraph_id\": token[\"paragraph_id\"],\n",
    "                    \"entity\": entity.ents,\n",
    "                }\n",
    "            )\n",
    "    return paragraph_entities\n",
    "\n",
    "def enrich_entities(entities):\n",
    "    enriched_entities = []\n",
    "    for entity in entities:\n",
    "        enriched_entities.append(\n",
    "            {\n",
    "                \"url\": entity[\"url\"],\n",
    "                \"paragraph_id\": entity[\"paragraph_id\"],\n",
    "                \"entity_text\": entity[\"entity\"][0].text,\n",
    "                \"entity_start_char\": entity[\"entity\"][0].start_char,\n",
    "                \"entity_end_char\": entity[\"entity\"][0].end_char,\n",
    "                \"entity_label\": entity[\"entity\"][0].label_,\n",
    "            }\n",
    "        )\n",
    "    return enriched_entities"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Build and show the graph:**\n",
    "\n",
    "Create the master function (\"multi-func\") with the `data_prep.py` source and an async graph topology. \n",
    "Add a pipeline of steps made of custom python handlers, classes and built-in classes (like `storey.FlatMap`).\n",
    "\n",
    "The pipeline runs across two functions which are connected by a queue/stream (q1). Use the `function=` to specify which function runs the specified step.\n",
    "End the flow with writing to the output stream."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<mlrun.serving.states.QueueState at 0x7f9e618f9910>"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# define a new real-time serving function (from code) with an async graph\n",
    "fn = mlrun.code_to_function(\n",
    "    \"multi-func\", filename=\"./data_prep.py\", kind=\"serving\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "graph = fn.set_topology(\"flow\", engine=\"async\")\n",
    "\n",
    "# define the graph steps (DAG)\n",
    "graph.to(name=\"load_url\", handler=\"load_url\").to(\n",
    "    name=\"to_paragraphs\", handler=\"to_paragraphs\"\n",
    ").to(\"storey.FlatMap\", \"flatten_paragraphs\", _fn=\"(event)\").to(\n",
    "    \">>\", \"q1\", path=internal_stream\n",
    ").to(\n",
    "    name=\"nlp\", class_name=\"ApplyNLP\", function=\"enrich\"\n",
    ").to(\n",
    "    name=\"extract_entities\", handler=\"extract_entities\", function=\"enrich\"\n",
    ").to(\n",
    "    name=\"enrich_entities\", handler=\"enrich_entities\", function=\"enrich\"\n",
    ").to(\n",
    "    \"storey.FlatMap\", \"flatten_entities\", _fn=\"(event)\", function=\"enrich\"\n",
    ").to(\n",
    "    name=\"printer\", handler=\"myprint\", function=\"enrich\"\n",
    ").to(\n",
    "    \">>\", \"output_stream\", path=out_stream\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"1699pt\" height=\"44pt\"\n",
       " viewBox=\"0.00 0.00 1699.41 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-40 1695.4076,-40 1695.4076,4 -4,4\"/>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-.0493 40.698,-.1479 42.8263,-.2953 44.9236,-.4913 46.9815,-.7353 48.9917,-1.0266 50.9463,-1.3645 52.8377,-1.7479 54.6587,-2.1759 56.4025,-2.6472 58.0628,-3.1606 59.634,-3.7147 61.1107,-4.308 62.4882,-4.9388 63.7625,-5.6054 64.9302,-6.3059 65.9882,-7.0385 66.9343,-7.8012 67.7669,-8.5918 68.4849,-9.4082 69.0878,-10.2481 69.5758,-11.1093 69.9496,-11.9894 70.2102,-12.886 70.3595,-13.7965 70.3997,-14.7186 70.3334,-15.6497 70.1636,-16.5873 69.8937,-17.5287 69.5276,-18.4713 69.0691,-19.4127 68.5225,-20.3503 67.8923,-21.2814 67.1831,-22.2035 66.3996,-23.114 65.5464,-24.0106 64.6285,-24.8907 63.6504,-25.7519 62.617,-26.5918 61.5329,-27.4082 60.4024,-28.1988 59.2299,-28.9615 58.0197,-29.6941 56.7755,-30.3946 55.5012,-31.0612 54.2002,-31.692 52.8757,-32.2853 51.5309,-32.8394 50.1684,-33.3528 48.7908,-33.8241 47.4003,-34.2521 45.9989,-34.6355 44.5886,-34.9734 43.1708,-35.2647 41.7472,-35.5087 40.3189,-35.7047 38.8872,-35.8521 37.4531,-35.9507 36.0175,-36 34.5815,-36 33.146,-35.9507 31.7119,-35.8521 30.2801,-35.7047 28.8519,-35.5087 27.4282,-35.2647 26.0105,-34.9734 24.6001,-34.6355 23.1988,-34.2521 21.8083,-33.8241 20.4306,-33.3528 19.0681,-32.8394 17.7233,-32.2853 16.3989,-31.692 15.0979,-31.0612 13.8236,-30.3946 12.5794,-29.6941 11.3691,-28.9615 10.1967,-28.1988 9.0662,-27.4082 7.982,-26.5918 6.9486,-25.7519 5.9706,-24.8907 5.0526,-24.0106 4.1995,-23.114 3.4159,-22.2035 2.7067,-21.2814 2.0765,-20.3503 1.53,-19.4127 1.0715,-18.4713 .7053,-17.5287 .4355,-16.5873 .2657,-15.6497 .1993,-14.7186 .2395,-13.7965 .3888,-12.886 .6495,-11.9894 1.0232,-11.1093 1.5112,-10.2481 2.1141,-9.4082 2.8321,-8.5918 3.6647,-7.8012 4.6109,-7.0385 5.6689,-6.3059 6.8365,-5.6054 8.1108,-4.9388 9.4884,-4.308 10.9651,-3.7147 12.5362,-3.1606 14.1966,-2.6472 15.9404,-2.1759 17.7614,-1.7479 19.6528,-1.3645 21.6074,-1.0266 23.6176,-.7353 25.6755,-.4913 27.7728,-.2953 29.901,-.1479 32.0515,-.0493 34.2154,0 36.3837,0 38.5476,-.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- load_url -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>load_url</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"154.6948\" cy=\"-18\" rx=\"48.1917\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"154.6948\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">load_url</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;load_url -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;load_url</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.8336,-18C78.1401,-18 87.2422,-18 96.3041,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.4737,-21.5001 106.4736,-18 96.4736,-14.5001 96.4737,-21.5001\"/>\n",
       "</g>\n",
       "<!-- to_paragraphs -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>to_paragraphs</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"316.7836\" cy=\"-18\" rx=\"77.9862\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"316.7836\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">to_paragraphs</text>\n",
       "</g>\n",
       "<!-- load_url&#45;&gt;to_paragraphs -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>load_url&#45;&gt;to_paragraphs</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M202.8879,-18C211.092,-18 219.8367,-18 228.6945,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"228.7205,-21.5001 238.7204,-18 228.7204,-14.5001 228.7205,-21.5001\"/>\n",
       "</g>\n",
       "<!-- flatten_paragraphs -->\n",
       "<g id=\"node4\" class=\"node\">\n",
       "<title>flatten_paragraphs</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"530.2179\" cy=\"-18\" rx=\"99.3824\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"530.2179\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">flatten_paragraphs</text>\n",
       "</g>\n",
       "<!-- to_paragraphs&#45;&gt;flatten_paragraphs -->\n",
       "<g id=\"edge3\" class=\"edge\">\n",
       "<title>to_paragraphs&#45;&gt;flatten_paragraphs</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M395.0467,-18C403.3413,-18 411.8834,-18 420.4516,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"420.5089,-21.5001 430.5088,-18 420.5088,-14.5001 420.5089,-21.5001\"/>\n",
       "</g>\n",
       "<!-- q1 -->\n",
       "<g id=\"node5\" class=\"node\">\n",
       "<title>q1</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"707.659,-30 665.659,-30 665.659,-6 707.659,-6 719.659,-18 707.659,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"692.659\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">q1</text>\n",
       "</g>\n",
       "<!-- flatten_paragraphs&#45;&gt;q1 -->\n",
       "<g id=\"edge4\" class=\"edge\">\n",
       "<title>flatten_paragraphs&#45;&gt;q1</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M629.855,-18C638.7948,-18 647.4203,-18 655.2668,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"655.44,-21.5001 665.44,-18 655.44,-14.5001 655.44,-21.5001\"/>\n",
       "</g>\n",
       "<!-- nlp -->\n",
       "<g id=\"node6\" class=\"node\">\n",
       "<title>nlp</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"782.659\" cy=\"-18\" rx=\"27\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"782.659\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">nlp</text>\n",
       "</g>\n",
       "<!-- q1&#45;&gt;nlp -->\n",
       "<g id=\"edge5\" class=\"edge\">\n",
       "<title>q1&#45;&gt;nlp</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M719.662,-18C727.6867,-18 736.6256,-18 745.1899,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"745.3641,-21.5001 755.3641,-18 745.364,-14.5001 745.3641,-21.5001\"/>\n",
       "</g>\n",
       "<!-- extract_entities -->\n",
       "<g id=\"node7\" class=\"node\">\n",
       "<title>extract_entities</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"930.1515\" cy=\"-18\" rx=\"84.485\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"930.1515\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">extract_entities</text>\n",
       "</g>\n",
       "<!-- nlp&#45;&gt;extract_entities -->\n",
       "<g id=\"edge6\" class=\"edge\">\n",
       "<title>nlp&#45;&gt;extract_entities</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M809.7114,-18C817.3456,-18 826.1139,-18 835.3642,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"835.5601,-21.5001 845.56,-18 835.56,-14.5001 835.5601,-21.5001\"/>\n",
       "</g>\n",
       "<!-- enrich_entities -->\n",
       "<g id=\"node8\" class=\"node\">\n",
       "<title>enrich_entities</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"1131.2369\" cy=\"-18\" rx=\"80.6858\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"1131.2369\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich_entities</text>\n",
       "</g>\n",
       "<!-- extract_entities&#45;&gt;enrich_entities -->\n",
       "<g id=\"edge7\" class=\"edge\">\n",
       "<title>extract_entities&#45;&gt;enrich_entities</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M1014.847,-18C1023.2409,-18 1031.786,-18 1040.2402,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"1040.4912,-21.5001 1050.4911,-18 1040.4911,-14.5001 1040.4912,-21.5001\"/>\n",
       "</g>\n",
       "<!-- flatten_entities -->\n",
       "<g id=\"node9\" class=\"node\">\n",
       "<title>flatten_entities</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"1329.0725\" cy=\"-18\" rx=\"81.4863\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"1329.0725\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">flatten_entities</text>\n",
       "</g>\n",
       "<!-- enrich_entities&#45;&gt;flatten_entities -->\n",
       "<g id=\"edge8\" class=\"edge\">\n",
       "<title>enrich_entities&#45;&gt;flatten_entities</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M1211.9896,-18C1220.345,-18 1228.8796,-18 1237.3435,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"1237.616,-21.5001 1247.6159,-18 1237.6159,-14.5001 1237.616,-21.5001\"/>\n",
       "</g>\n",
       "<!-- printer -->\n",
       "<g id=\"node10\" class=\"node\">\n",
       "<title>printer</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"1489.8615\" cy=\"-18\" rx=\"43.5923\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"1489.8615\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">printer</text>\n",
       "</g>\n",
       "<!-- flatten_entities&#45;&gt;printer -->\n",
       "<g id=\"edge9\" class=\"edge\">\n",
       "<title>flatten_entities&#45;&gt;printer</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M1410.4091,-18C1419.0624,-18 1427.7012,-18 1435.9243,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"1436.0878,-21.5001 1446.0878,-18 1436.0878,-14.5001 1436.0878,-21.5001\"/>\n",
       "</g>\n",
       "<!-- output_stream -->\n",
       "<g id=\"node11\" class=\"node\">\n",
       "<title>output_stream</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"1679.4076,-30 1569.4076,-30 1569.4076,-6 1679.4076,-6 1691.4076,-18 1679.4076,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"1630.4076\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">output_stream</text>\n",
       "</g>\n",
       "<!-- printer&#45;&gt;output_stream -->\n",
       "<g id=\"edge10\" class=\"edge\">\n",
       "<title>printer&#45;&gt;output_stream</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M1533.5607,-18C1541.6692,-18 1550.3313,-18 1559.0161,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"1559.19,-21.5001 1569.19,-18 1559.19,-14.5001 1559.19,-21.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7f9dd5dbed90>"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# specify the \"enrich\" child function, add extra package requirements\n",
    "child = fn.add_child_function(\"enrich\", \"./nlp.py\", \"mlrun/mlrun\")\n",
    "child.spec.build.commands = [\n",
    "    \"python -m pip install spacy\",\n",
    "    \"python -m spacy download en_core_web_sm\",\n",
    "]\n",
    "graph.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Test the pipeline locally\n",
    "\n",
    "**Create an input file:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting in.json\n"
     ]
    }
   ],
   "source": [
    "%%writefile in.json\n",
    "[\"Born and raised in Queens, New York City, Trump attended Fordham University for two years and received a bachelor's degree in economics from the Wharton School of the University of Pennsylvania. He became president of his father Fred Trump's real estate business in 1971, renamed it The Trump Organization, and expanded its operations to building or renovating skyscrapers, hotels, casinos, and golf courses. Trump later started various side ventures, mostly by licensing his name. Trump and his businesses have been involved in more than 4,000 state and federal legal actions, including six bankruptcies. He owned the Miss Universe brand of beauty pageants from 1996 to 2015, and produced and hosted the reality television series The Apprentice from 2004 to 2015.\", \n",
    " \"Trump's political positions have been described as populist, protectionist, isolationist, and nationalist. He entered the 2016 presidential race as a Republican and was elected in a surprise electoral college victory over Democratic nominee Hillary Clinton while losing the popular vote.[a] He became the oldest first-term U.S. president[b] and the first without prior military or government service. His election and policies have sparked numerous protests. Trump has made many false or misleading statements during his campaign and presidency. The statements have been documented by fact-checkers, and the media have widely described the phenomenon as unprecedented in American politics. Many of his comments and actions have been characterized as racially charged or racist.\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Create a mock server (simulator) and test:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# tuggle verbosity if needed\n",
    "fn.verbose = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "to\n",
    "# create a mock server (simulator), specify to simulate all the functions in the pipeline (\"*\")\n",
    "server = fn.to_mock_server(current_function=\"*\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Queens', 'entity_start_char': 19, 'entity_end_char': 25, 'entity_label': 'GPE'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'New York City', 'entity_start_char': 27, 'entity_end_char': 40, 'entity_label': 'GPE'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Trump', 'entity_start_char': 42, 'entity_end_char': 47, 'entity_label': 'ORG'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Fordham University', 'entity_start_char': 57, 'entity_end_char': 75, 'entity_label': 'ORG'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'two years', 'entity_start_char': 80, 'entity_end_char': 89, 'entity_label': 'DATE'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'the Wharton School of the University of Pennsylvania', 'entity_start_char': 141, 'entity_end_char': 193, 'entity_label': 'ORG'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Fred Trump', 'entity_start_char': 229, 'entity_end_char': 239, 'entity_label': 'PERSON'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': '1971', 'entity_start_char': 266, 'entity_end_char': 270, 'entity_label': 'DATE'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'The Trump Organization', 'entity_start_char': 283, 'entity_end_char': 305, 'entity_label': 'ORG'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'more than 4,000', 'entity_start_char': 529, 'entity_end_char': 544, 'entity_label': 'CARDINAL'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'six', 'entity_start_char': 588, 'entity_end_char': 591, 'entity_label': 'CARDINAL'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Universe', 'entity_start_char': 624, 'entity_end_char': 632, 'entity_label': 'PERSON'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': '1996 to 2015', 'entity_start_char': 663, 'entity_end_char': 675, 'entity_label': 'DATE'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'The Apprentice', 'entity_start_char': 731, 'entity_end_char': 745, 'entity_label': 'WORK_OF_ART'}\n",
      "{'url': 'in.json', 'paragraph_id': 0, 'entity_text': '2004 to 2015', 'entity_start_char': 751, 'entity_end_char': 763, 'entity_label': 'DATE'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Trump', 'entity_start_char': 0, 'entity_end_char': 5, 'entity_label': 'ORG'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': '2016', 'entity_start_char': 122, 'entity_end_char': 126, 'entity_label': 'DATE'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Republican', 'entity_start_char': 150, 'entity_end_char': 160, 'entity_label': 'NORP'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Democratic', 'entity_start_char': 222, 'entity_end_char': 232, 'entity_label': 'NORP'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Hillary Clinton', 'entity_start_char': 241, 'entity_end_char': 256, 'entity_label': 'PERSON'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'first', 'entity_start_char': 312, 'entity_end_char': 317, 'entity_label': 'ORDINAL'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'U.S.', 'entity_start_char': 323, 'entity_end_char': 327, 'entity_label': 'GPE'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'first', 'entity_start_char': 349, 'entity_end_char': 354, 'entity_label': 'ORDINAL'}\n",
      "{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'American', 'entity_start_char': 671, 'entity_end_char': 679, 'entity_label': 'NORP'}\n"
     ]
    }
   ],
   "source": [
    "# push a sample request into the pipeline and see the results print out (by the printer step)\n",
    "resp = server.test(body={\"url\": \"in.json\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "server.wait_for_completion()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Deploy to the cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-05-03 14:33:55,400 [info] deploy child function enrich ...\n",
      "> 2021-05-03 14:33:55,427 [info] Starting remote function deploy\n",
      "2021-05-03 14:33:55  (info) Deploying function\n",
      "2021-05-03 14:33:55  (info) Building\n",
      "2021-05-03 14:33:55  (info) Staging files and preparing base images\n",
      "2021-05-03 14:33:55  (info) Building processor image\n",
      "2021-05-03 14:34:02  (info) Build complete\n",
      "2021-05-03 14:34:08  (info) Function deploy complete\n",
      "> 2021-05-03 14:34:09,232 [info] function deployed, address=default-tenant.app.yh30.iguazio-c0.com:32356\n",
      "> 2021-05-03 14:34:09,233 [info] deploy root function multi-func ...\n",
      "> 2021-05-03 14:34:09,234 [info] Starting remote function deploy\n",
      "2021-05-03 14:34:09  (info) Deploying function\n",
      "2021-05-03 14:34:09  (info) Building\n",
      "2021-05-03 14:34:09  (info) Staging files and preparing base images\n",
      "2021-05-03 14:34:09  (info) Building processor image\n",
      "2021-05-03 14:34:16  (info) Build complete\n",
      "2021-05-03 14:34:22  (info) Function deploy complete\n",
      "> 2021-05-03 14:34:22,891 [info] function deployed, address=default-tenant.app.yh30.iguazio-c0.com:32046\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'http://default-tenant.app.yh30.iguazio-c0.com:32046'"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# add credentials to the data/streams\n",
    "fn.apply(mlrun.platforms.v3io_cred())\n",
    "child.apply(mlrun.platforms.v3io_cred())\n",
    "\n",
    "# specify the error stream (to store exceptions from the functions)\n",
    "fn.spec.error_stream = err_stream\n",
    "\n",
    "# deploy as a set of serverless functions\n",
    "fn.deploy()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Listen on the output stream**\n",
    "\n",
    "You can use the SDK or CLI to listen on the output stream. Listening should be done in a separate console/notebook. Run:\n",
    "\n",
    "    mlrun watch-stream v3io:///users/admin/out-stream -j\n",
    "\n",
    "or use the SDK:\n",
    "```python\n",
    "from mlrun.platforms import watch_stream\n",
    "watch_stream(\"v3io:///users/admin/out-stream\", is_json=True)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Test the live function:**\n",
    "```{Admonition} Note  \n",
    "The url must be a valid path to the input file.\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'id': '79354e45-a158-405f-811c-976e9cf4ab5e'}"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn.invoke(\"\", body={\"url\": \"v3io:///users/admin/pipe/in.json\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
